home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import cupshelpers
- from debug import *
- import socket
- import time
- import gtk
- from timedops import TimedOperation
- import subprocess
-
- def wordsep(line):
- words = []
- escaped = False
- quoted = False
- in_word = False
- word = ''
- n = len(line)
- for i in range(n):
- ch = line[i]
- if escaped:
- word += ch
- escaped = False
- continue
-
- if ch == '\\':
- in_word = True
- escaped = True
- continue
-
- if in_word:
- if quoted:
- if ch == '"':
- quoted = False
- else:
- word += ch
- elif ch.isspace():
- words.append(word)
- word = ''
- in_word = False
- elif ch == '"':
- quoted = True
- else:
- word += ch
- quoted
- if ch == '"':
- in_word = True
- quoted = True
- continue
- if not ch.isspace():
- in_word = True
- word += ch
- continue
-
- if word != '':
- words.append(word)
-
- return words
-
-
- class LpdServer:
-
- def __init__(self, hostname):
- self.hostname = hostname
- self.max_lpt_com = 8
-
-
- def _open_socket(self):
- port = 515
-
- try:
- (host, port) = self.hostname.split(':', 1)
- except ValueError:
- host = self.hostname
-
- s = None
-
- try:
- ai = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
- except socket.gaierror:
- ai = []
-
- for res in ai:
- (af, socktype, proto, canonname, sa) = res
-
- try:
- s = socket.socket(af, socktype, proto)
- s.settimeout(0.5)
- except socket.error:
- msg = None
- s = None
- continue
-
-
- try:
- s.connect(sa)
- except socket.error:
- msg = None
- s.close()
- s = None
- continue
-
-
- return s
-
-
- def probe_queue(self, name, result):
- s = self._open_socket()
- if not s:
- return False
- print name
-
- try:
- s.send('\x02%s\n' % name)
- data = s.recv(1024)
- print repr(data)
- except socket.error:
- msg = None
- print msg
-
- try:
- s.close()
- except:
- pass
-
- return False
-
- if len(data) > 0 and ord(data[0]) == 0:
-
- try:
- s.send('\x01\n')
- s.close()
- except:
- pass
-
- result.append(name)
- return True
-
- try:
- s.close()
- except:
- ord(data[0]) == 0
-
- return False
-
-
- def get_possible_queue_names(self):
- candidate = [
- 'PASSTHRU',
- 'ps',
- 'lp',
- 'PORT1',
- '']
- for nr in range(self.max_lpt_com):
- candidate.extend([
- 'LPT%d' % nr,
- 'LPT%d_PASSTHRU' % nr,
- 'COM%d' % nr,
- 'COM%d_PASSTHRU' % nr])
-
- for nr in range(50):
- candidate.append('pr%d' % nr)
-
- return candidate
-
-
- def probe(self):
- result = []
- for name in self.get_possible_queue_names():
- while gtk.events_pending():
- gtk.main_iteration()
- found = self.probe_queue(name, result)
- if not found and name.startswith('pr'):
- break
-
- time.sleep(0.1)
-
- return result
-
-
-
- class PrinterFinder:
-
- def find(self, hostname, callback_fn):
- self.hostname = hostname
- self.callback_fn = callback_fn
- op = TimedOperation(self._do_find)
-
-
- def _do_find(self):
- self._cached_attributes = dict()
- for fn in [
- self._probe_snmp,
- self._probe_lpd,
- self._probe_hplip]:
-
- try:
- fn()
- continue
- except Exception:
- e = None
- nonfatalException()
- continue
-
-
-
- self.callback_fn(None)
-
-
- def _probe_snmp(self):
- null = file('/dev/null', 'r+')
-
- try:
- p = subprocess.Popen(args = [
- '/usr/lib/cups/backend/snmp',
- self.hostname], stdin = null, stdout = subprocess.PIPE, stderr = null)
- except OSError:
- e = None
- if e == errno.ENOENT:
- return None
- raise
- except:
- e == errno.ENOENT
-
- (stdout, stderr) = p.communicate()
- if p.returncode != 0:
- return None
- for line in stdout.split('\n'):
- words = wordsep(line)
- n = len(words)
- if n == 5:
- (device_class, uri, make_and_model, info, device_id) = words
- elif n == 4:
- (device_class, uri, make_and_model, info) = words
-
- device_dict = {
- 'device-class': device_class,
- 'device-make-and-model': make_and_model,
- 'device-info': info }
- if n == 5:
- device_dict['device-id'] = device_id
-
- device = cupshelpers.Device(uri, **device_dict)
- self.callback_fn(device)
- self._cached_attributes['device-make-and-model'] = make_and_model
-
-
-
- def _probe_lpd(self):
- lpd = LpdServer(self.hostname)
- for name in lpd.get_possible_queue_names():
- found = lpd.probe_queue(name, [])
- if found:
- uri = 'lpd://%s/%s' % (self.hostname, name)
- device_dict = {
- 'device-class': 'network',
- 'device-info': self.hostname }
- device_dict.update(self._cached_attributes)
- new_device = cupshelpers.Device(uri, **device_dict)
- self.callback_fn(new_device)
-
- if not found and name.startswith('pr'):
- break
-
- time.sleep(0.1)
-
-
-
- def _probe_hplip(self):
- null = file('/dev/null', 'r+')
-
- try:
- p = subprocess.Popen(args = [
- 'hp-makeuri',
- '-c',
- self.hostname], stdin = null, stdout = subprocess.PIPE, stderr = null)
- except OSError:
- e = None
- if e == errno.ENOENT:
- return None
- raise
- except:
- e == errno.ENOENT
-
- (stdout, stderr) = p.communicate()
- if p.returncode != 0:
- return None
- uri = stdout.strip()
-
-
-